package com.chenzhiling.study.Io


import com.google.common.io.ByteStreams
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, FileSystem, Path}
import org.apache.commons.io.output.ByteArrayOutputStream
import org.apache.commons.io.{FileUtils, IOUtils}
import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.SparkSession

import java.io.{File, FileInputStream, IOException, InputStream, RandomAccessFile}
import java.nio.MappedByteBuffer
import java.nio.channels.FileChannel
import java.nio.channels.FileChannel.MapMode
import java.nio.file.Files
import scala.collection.mutable.ArrayBuffer
/**
 * @Author: CHEN ZHI LING
 * @Date: 2021/7/22
 * @Description:
 */
object FileToArrayByte {
  private val ONE_ROW_MAX_ARRAY_SIZE: Int = 1024*1024*1
  val filePath = "/home/chenzhiling/桌面/json/15.json"

  /**
   * java nio流 api借口
   * 底层实现管道流
   * @param path 文件路径
   * @return
   */
  def methodOne(path:String):Array[Byte]={
    val file = new File(path)
    val bytes: Array[Byte] = Files.readAllBytes(file.toPath)
    bytes
  }

  /**
   * apache common io流方法
   * 底层 ByteArrayOutputStream实现
   * 最大上限 2147483647
   * @param path 文件路径
   * @return
   */
  def methodTwo(path:String):Array[Byte]={
    val stream = new FileInputStream(path)
    val bytes: Array[Byte] = IOUtils.toByteArray(stream)
    bytes
  }


  /**
   * apache common io api接口
   * 底层实现同方法4,但是使用循环读取
   * 文件大小上限 2147483647
   * @param path 文件路径
   * @return
   */
  def methodThree(path: String):Array[Byte]={
    val file = new File(path)
    val bytes: Array[Byte] = FileUtils.readFileToByteArray(file)
    bytes
  }



  /**
   * google common io api
   * 底层实现byteArrayOutputStream
   * @param path 文件路径
   * @return
   */
  def methodFour(path:String):Array[Byte] ={
    val stream = new FileInputStream(new File(path))
    val bytes: Array[Byte] = ByteStreams.toByteArray(stream)
    bytes
  }


  /**
   * randomAccess 实现
   * @param path 文件路径
   */
  def methodFive(path: String):Array[Byte]={
    val file = new RandomAccessFile(path,"r")
    val bytes = new Array[Byte](file.length().toInt)
    file.readFully(bytes)
    bytes
  }

  /**
   * 打开文件，读入输出流，转成字节输出流
   * @param path 文件路径
   * @return 需在内存读取 数据复制了三次 文件->buffer->ByteArrayOutPutStream->实际数组
   */
  def methodSix(path: String):Array[Byte]={
    var out:ByteArrayOutputStream = null
    var in:FileInputStream = null
    try {
      out = new ByteArrayOutputStream()
      in  = new FileInputStream(new File(path))
      val buffer = new Array[Byte](4096)
      var bytesRead: Int = in.read(buffer)
      while ( {
        bytesRead >= 0
      }) {
        out.write(buffer, 0, bytesRead)
        bytesRead = in.read(buffer)
      }
    }catch {
      case _: IOException =>
        throw new IOException("失败")
    }
    finally {
      if (null != out) out.close()
      if (null != in) in.close()
    }
    out.toByteArray
  }

  /**
   * 和方法6相比减少了2次复制
   * @param path 文件路径
   * @return
   */
  def methodSeven(path: String):Array[Byte]={
    val file = new File(path)
    val bytes = new Array[Byte](file.length().toInt)
    var in:FileInputStream = null
    try{
      in = new FileInputStream(file)
      if(in.read(bytes) == -1){
        throw new IOException("失败")
      }
    }catch {
      case _: IOException =>
        throw new IOException{"失败"}
    }finally {
      if (null != in) in.close()
    }
    bytes
  }

  /**
   * 内存映射
   * nio channel流
   * @param path 文件路径
   * @return
   */
  def methodEight(path:String):Array[Byte]={
    var in:FileInputStream = null
    var channel:FileChannel = null
    try{
      in = new FileInputStream(path)
      channel = in.getChannel
      val fileLength: Long = channel.size()
      val buffer: MappedByteBuffer = channel.map(MapMode.READ_ONLY, 0, fileLength)
      val bytes = new Array[Byte](fileLength.toInt)
      buffer.get(bytes)
      bytes
    }catch {
      case _:IOException => throw new IOException("失败")
    }finally {
      if (null != in) in .close()
      if (null != channel) channel.close()
    }
  }

  /**
   * 将一个字节流按指定大小切分
   * @param stream 字节流
   * @param fileSize 文件总大小
   * @return list[array]
   */
  def splitStream(stream:FSDataInputStream,fileSize:Long): List[Array[Byte]] = {
    //切分成n份
    val numOfChunks: Long = Math.ceil(fileSize.toDouble / ONE_ROW_MAX_ARRAY_SIZE).toInt
    val result = new ArrayBuffer[Array[Byte]]()
    for (i <- 0L until numOfChunks) {
      val start: Long = i * ONE_ROW_MAX_ARRAY_SIZE
      val length: Long = Math.min(fileSize - start, ONE_ROW_MAX_ARRAY_SIZE)
      //子集
      val subArray: Array[Byte] = splitStream(stream, length.toInt)
      result.append(subArray)
    }
    result.toList
  }

  /**
   * 将一个流按指定大小截取成字节数组
   * @param stream 流
   * @param size 字节数组大小
   * @return
   */
  def splitStream(stream:FSDataInputStream,size:Int):Array[Byte]={
    val bytes = new Array[Byte](size)
    var fileNameReadLength = 0
    var hasReadLength = 0 //已经读取的字节数
    while ( {fileNameReadLength = stream.read(bytes, hasReadLength,size - hasReadLength);fileNameReadLength} > 0) {
      hasReadLength = hasReadLength + fileNameReadLength
    }
    bytes
  }

  /**
   * 分割inputStream,读取远程服务器文件用
   * 远程服务器无法使用FSDataInputStream
   * @param stream 文件流
   * @param fileSize 文件大小
   * @return
   */
  def splitRemoteStream(stream:InputStream,fileSize:Long): Iterator[Array[Byte]] = {
    //切分成n份
    val numOfChunks: Long = Math.ceil(fileSize.toDouble / ONE_ROW_MAX_ARRAY_SIZE).toInt
    val result = new ArrayBuffer[Array[Byte]]()
    for (i <- 0L until numOfChunks) {
      val start: Long = i * ONE_ROW_MAX_ARRAY_SIZE
      val length: Long = Math.min(fileSize - start, ONE_ROW_MAX_ARRAY_SIZE)
      //子集
      val subArray: Array[Byte] = splitRemoteStream(stream, length.toInt)
      result.append(subArray)
    }
    result.toIterator
  }


  /**
   * 将一个流按指定大小截取成字节数组,读取远程服务器文件用
   * 远程服务器无法使用FSDataInputStream
   * @param stream 流
   * @param size 字节数组大小
   * @return
   */
  def splitRemoteStream(stream:InputStream,size:Int):Array[Byte]={
    val bytes: Array[Byte] = new Array[Byte](size)
    var fileNameReadLength: Int = 0
    var hasReadLength: Int = 0 //已经读取的字节数
    while ( {fileNameReadLength = stream.read(bytes, hasReadLength,size - hasReadLength);fileNameReadLength} > 0) {
      hasReadLength = hasReadLength + fileNameReadLength
    }
    bytes
  }



  def main(args: Array[String]): Unit = {
    val spark: SparkSession = SparkSession.builder().master("local").appName("ArrayTest").getOrCreate()
    val configuration: Configuration = spark.sparkContext.hadoopConfiguration
    val path = new Path(filePath)
    val fs: FileSystem = path.getFileSystem(configuration)
    val status: FileStatus = fs.getFileStatus(path)
    val len: Long = status.getLen
    val stream: FSDataInputStream = fs.open(status.getPath)
    val list: List[Array[Byte]] = FileToArrayByte.splitStream(stream, len)
    println(list.size)
  }
}
